とりあえずexportしない!Digdag変数・パラメータの使い方全種
こんにちは。DA事業本部の春田です。
Digdagで変数やパラメータを定義する時は _export
するのが一般的かと思いますが、他にも色々やり方があるのでご紹介していきます。環境はGitHubに上げました。
もくじ
Digdagの変数・パラメータ
Digdag公式ドキュメントのConcepts — Export and store parametersによると、標準的なパラメータは3種類あります。
- local
- (digファイルで
+
をつけて定義する)タスクに直接セットするパラメータ
- (digファイルで
- export
- 親タスクから出力されるパラメータ
- store
- 前のタスクで保存されたパラメータ
このうち、 local
が最も優先度が高いパラメータで、同名の export
や store
パラメータは上書きされます。 export
と store
は同じ優先度で、後に定義したもので上書きします。 export
は親タスク配下にしか影響が及ばない一方、 store
は後続のタスクにもパラメータが適応されます。また、 store
はグローバルな変数ではないため、同じ名前の store
パラメータを持つ2つのタスクが並列で実行されても、タスク間のパラメータで影響はありません。この場合、最後に終了したタスクのパラメータが、後続のタスクに適応されます。
また、 digdag server
や digdag run
を起動するときに --params-file オプションで指定できるJSON/YAMLファイル形式のパラメータもあります。こちらはコマンドを叩く時にオプションとして設定するものなので、上の3種のパラメータの外のスコープにあります。digファイル内の上記のパラメータによって値を上書くことはできますが、JSON/YAMLファイルの中身自体は上書きされません。
その他にも、 Param Server という外部DBにパラメータを格納させるタイプもあります。こちらはTTLが90日と設定されている範囲でパラメータを保存しておくことが可能です。使用にはPostgreSQLかRedisのセットアップが必要です。
param_set>: Set a value into a ParamServer as persistent data — Digdag 0.9.41 documentation
パラメータの取得・保存方法
digファイル上
digファイル上でパラメータを取得するには、 ${variable}
のように ${ } を使います。パラメータをlocalとして定義する場合は、 operators>
と同じ階層で定義します。exportパラメータとして定義する場合は、 +task
か operators>
と同じ階層に _export
を記述します。一方、storeパラメータは operators>
の処理の中で定義します。Pythonプログラムの場合は、 digdag.env.store()
メソッドを使用します。
# 例 +print_var_parent: _export: var1: Apple var2: Orange +print_var_child1: echo> ${var1} +print_var_child2: py>: print_var.hello_world var1: Updated Apple by local
# https://docs.digdag.io/workflow_definition.html#using-api import digdag class MyWorkflow(object): def prepare(self): digdag.env.store({"my_param": 2})
--params-file オプションはその名の通り、JSON/YAMLファイルにパラメータを定義し、 digdag
コマンドを叩く際にオプションで指定します。上と同様、 ${variable}
で取得できます。
Param Server は、専用のParam operators param_get>
と param_set>
を用いてパラメータを保存・取得します。一度 param_get>
したパラメータはstoreパラメータとして、後続ジョブにおいて ${variable}
で取得することができます。
+print_var_parent: _export: var1: Apple +set_value: # Key=var2 Value=OrangeでDBに保存 param_set>: var2: Orange +get_value: # DB側でKey=var2のパラメータを、storeパラメータとして定義 param_get>: any_key_name: var2 # any_key_name自体はダミーでしかなく、パラメータに対して何ら影響を及ぼさない +print_var_child: py>: print_var.hello_world
Pythonファイル上
Pythonファイルで変数を取得する方法は2つあります。一つは、 py>
で呼び出す関数の 引数 に、渡したい同名パラメータを定義する方法です。もう一つは、Python Digdag APIの digdag.env.params
メソッドを使う方法です。前者で呼び出すと、Pythonファイルでdigdagライブラリをimportする必要がなくなり、PythonファイルとDigdagをゆるやかに結合することができます。一方後者は、 session_id
などのビルトイン変数もまとめて取得することができるので、ワークフローのメタ情報をPythonで扱いたい場合に便利です。どちらも一長一短ですね。
import digdag import json def hello_world(var1, var2): print("Hello World, {}".format(var1)) print("Hello World, {}".format(var2)) param = digdag.env.params print(param['session_id'])
では、実際に確認していきます。
各種パラメータの検証
今回、基本となるPythonプログラムは以下です。 var1
var2
と、Digdag APIで取得できる全パラメータを出力します。 print_and_store_vars
では、関数の最後で var2
にstoreパラメータを保存します。
import digdag import json def print_vars(var1, var2): print("Hello World, {}".format(var1)) print("Hello World, {}".format(var2)) print(json.dumps(digdag.env.params, indent=4)) def print_and_store_vars(var1, var2): print("Hello World, {}".format(var1)) print("Hello World, {}".format(var2)) print(json.dumps(digdag.env.params, indent=4)) digdag.env.store({"var2": "Updated Orange by store"})
export local store
export、local、storeパラメータのそれぞれの挙動を確認します。 child2
では親タスクで _export
されていたパラメータが、それぞれ上書きされて出力されましたが、 child3
のパラメータには影響を与えていません。また、 child3
で変更を加えた var2
は、 child4
で変更が維持されたままになっています。 child4
では、同じパラメータ var1
に対して、exportとlocalで上書きしていますが、結果的に最も優先度の高いlocalが出力されました。
+parent: _export: var1: Apple var2: Orange +child1: py>: print_var.print_vars +child2: _export: var2: Updated Orange by export py>: print_var.print_vars var1: Updated Apple by local +child3: py>: print_var.print_and_store_vars +child4: _export: var1: Updated Apple by export py>: print_var.print_vars var1: Updated Apple by local
[INFO] (8807@[0:new-project]+print_var+parent+child1) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars Hello World, Apple Hello World, Orange { "session_unixtime": 1583989342, "py>": "print_var.print_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "8253541c-fde1-4741-839b-855d18724adf", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Apple", "var2": "Orange", "attempt_id": 23, "_command": "print_var.print_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:02:22+00:00", "session_id": 23, "session_local_time": "2020-03-12 05:02:22", "session_date_compact": "20200312", "task_name": "+print_var+parent+child1" } [INFO] (8807@[0:new-project]+print_var+parent+child2) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars Hello World, Updated Apple by local Hello World, Updated Orange by export { "session_unixtime": 1583989342, "py>": "print_var.print_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "8253541c-fde1-4741-839b-855d18724adf", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Updated Apple by local", "var2": "Updated Orange by export", "attempt_id": 23, "_command": "print_var.print_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:02:22+00:00", "session_id": 23, "session_local_time": "2020-03-12 05:02:22", "session_date_compact": "20200312", "task_name": "+print_var+parent+child2" } [INFO] (8807@[0:new-project]+print_var+parent+child3) io.digdag.core.agent.OperatorManager: py>: print_var.print_and_store_vars Hello World, Apple Hello World, Orange { "session_unixtime": 1583989342, "py>": "print_var.print_and_store_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "8253541c-fde1-4741-839b-855d18724adf", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Apple", "var2": "Orange", "attempt_id": 23, "_command": "print_var.print_and_store_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:02:22+00:00", "session_id": 23, "session_local_time": "2020-03-12 05:02:22", "session_date_compact": "20200312", "task_name": "+print_var+parent+child3" } [INFO] (8807@[0:new-project]+print_var+parent+child4) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars Hello World, Updated Apple by local Hello World, Updated Orange by store { "session_unixtime": 1583989342, "py>": "print_var.print_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "8253541c-fde1-4741-839b-855d18724adf", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Updated Apple by local", "var2": "Updated Orange by store", "attempt_id": 23, "_command": "print_var.print_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:02:22+00:00", "session_id": 23, "session_local_time": "2020-03-12 05:02:22", "session_date_compact": "20200312", "task_name": "+print_var+parent+child4" }
--params-file
digdag server
を立ち上げる際に、 --params-file
で以下のYAMLファイルを渡しておきます。
var1: 'Apple from params file' var2: 'Orange from params file'
実行するワークフローは、上のワークフローで親タスクで定義しているexportパラメータを抜いたものです。
+parent: +child1: py>: print_var.print_vars +child2: _export: var2: Updated Orange by export py>: print_var.print_vars var1: Updated Apple by local +child3: py>: print_var.print_and_store_vars +child4: _export: var1: Updated Apple by export py>: print_var.print_vars var1: Updated Apple by local
--params-fileで定義したパラメータは、ワークフロー横断的で、exportよりもグローバルになっていることが確認できます。
[INFO] (0174@[0:new-project]+print_var+parent+child1) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars Hello World, Apple from params file Hello World, Orange from params file { "session_unixtime": 1583990259, "py>": "print_var.print_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "9c88fabb-63d2-4c6f-89ad-a6cc61a5f218", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Apple from params file", "var2": "Orange from params file", "attempt_id": 26, "_command": "print_var.print_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:17:39+00:00", "session_id": 26, "session_local_time": "2020-03-12 05:17:39", "session_date_compact": "20200312", "task_name": "+print_var+parent+child1" } [INFO] (0174@[0:new-project]+print_var+parent+child2) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars Hello World, Updated Apple by local Hello World, Updated Orange by export { "session_unixtime": 1583990259, "py>": "print_var.print_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "9c88fabb-63d2-4c6f-89ad-a6cc61a5f218", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Updated Apple by local", "var2": "Updated Orange by export", "attempt_id": 26, "_command": "print_var.print_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:17:39+00:00", "session_id": 26, "session_local_time": "2020-03-12 05:17:39", "session_date_compact": "20200312", "task_name": "+print_var+parent+child2" } [INFO] (0174@[0:new-project]+print_var+parent+child3) io.digdag.core.agent.OperatorManager: py>: print_var.print_and_store_vars Hello World, Apple from params file Hello World, Orange from params file { "session_unixtime": 1583990259, "py>": "print_var.print_and_store_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "9c88fabb-63d2-4c6f-89ad-a6cc61a5f218", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Apple from params file", "var2": "Orange from params file", "attempt_id": 26, "_command": "print_var.print_and_store_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:17:39+00:00", "session_id": 26, "session_local_time": "2020-03-12 05:17:39", "session_date_compact": "20200312", "task_name": "+print_var+parent+child3" } [INFO] (0174@[0:new-project]+print_var+parent+child4) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars Hello World, Updated Apple by local Hello World, Updated Orange by store { "session_unixtime": 1583990259, "py>": "print_var.print_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "9c88fabb-63d2-4c6f-89ad-a6cc61a5f218", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Updated Apple by local", "var2": "Updated Orange by store", "attempt_id": 26, "_command": "print_var.print_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:17:39+00:00", "session_id": 26, "session_local_time": "2020-03-12 05:17:39", "session_date_compact": "20200312", "task_name": "+print_var+parent+child4" }
Param Server
Param Serverにパラメータを保存・取得するには、 param_set>
と param_get>
を用います。
+parent: +child1: py>: print_var.print_and_store_vars +set_values: param_set>: var1: Updated Apple by param_set +child2: py>: print_var.print_vars +get_values: param_get>: var1: var1 +child3: py>: print_var.print_vars
child2
の前に var1
へ param_set
していますが、この段階ではワークフロー上の var1
への影響はありません。 child2
後に param_get
をすることで、 var1
はstoreパラメータとしてワークフロー上の var1
に反映され、 child3
で変更が確認できます。
[INFO] (0176@[0:new-project]+print_var+parent+child1) io.digdag.core.agent.OperatorManager: py>: print_var.print_and_store_vars Hello World, Apple from params file Hello World, Orange from params file { "session_unixtime": 1583991151, "py>": "print_var.print_and_store_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "4c26d3bd-32c5-4551-93b3-c92eb6bebad0", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Apple from params file", "var2": "Orange from params file", "attempt_id": 27, "_command": "print_var.print_and_store_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:32:31+00:00", "session_id": 27, "session_local_time": "2020-03-12 05:32:31", "session_date_compact": "20200312", "task_name": "+print_var+parent+child1" } [INFO] (0176@[0:new-project]+print_var+parent+child2) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars Hello World, Apple from params file Hello World, Updated Orange by store { "session_unixtime": 1583991151, "py>": "print_var.print_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "4c26d3bd-32c5-4551-93b3-c92eb6bebad0", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Apple from params file", "var2": "Updated Orange by store", "attempt_id": 27, "_command": "print_var.print_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:32:31+00:00", "session_id": 27, "session_local_time": "2020-03-12 05:32:31", "session_date_compact": "20200312", "task_name": "+print_var+parent+child2" } [INFO] (0176@[0:new-project]+print_var+parent+child3) io.digdag.core.agent.OperatorManager: py>: print_var.print_vars Hello World, Updated Apple by param_set Hello World, Updated Orange by store { "session_unixtime": 1583991151, "py>": "print_var.print_vars", "last_executed_session_tz_offset": "+0000", "last_executed_session_time": "", "timezone": "UTC", "last_executed_session_date": "", "session_tz_offset": "+0000", "session_uuid": "4c26d3bd-32c5-4551-93b3-c92eb6bebad0", "project_id": 1, "last_executed_session_date_compact": "", "_type": "py", "var1": "Updated Apple by param_set", "var2": "Updated Orange by store", "attempt_id": 27, "_command": "print_var.print_vars", "session_date": "2020-03-12", "last_executed_session_unixtime": "", "last_executed_session_local_time": "", "session_time": "2020-03-12T05:32:31+00:00", "session_id": 27, "session_local_time": "2020-03-12 05:32:31", "session_date_compact": "20200312", "task_name": "+print_var+parent+child3" }
最後に
普段Digdagを使う上ではあまり気にしない仕様ですが、知っておくと生成データやステータス情報を受け渡す時にかなり役立つかと思います。GitHubにDocker環境ごと上げてますので、ぜひ試してみてください。